package com.niit.TopN.CityTopN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.niit.bean.StudentBean;
import com.niit.TopOne.schooltopone.SchoolGroupingComparator;


public class CityTopNDriver {
    public static void main(String[] args) throws Exception{
        //配置文件对象
        Configuration conf = new Configuration();
        // 创建作业实例
        Job job = Job.getInstance(conf, CityTopNDriver.class.getSimpleName());
        job.setJarByClass(CityTopNDriver.class);
        job.setMapperClass(CityTopNMapper.class);
        job.setReducerClass(CityTopNReducer.class);
        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(StudentBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(StudentBean.class);
        job.setOutputValueClass(NullWritable.class);
        job.setGroupingComparatorClass(SchoolGroupingComparator.class);
        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, new Path("input/covid"));
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, new Path("output/covid/topN/CityTopN"));
        //判断输出路径是否存在 如果存在删除
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(new Path("output/covid/topN/CityTopN"))){
            fs.delete(new Path("output/covid/topN/CityTopN"),true);
        }
        // 提交作业并等待执行完成
        boolean resultFlag = job.waitForCompletion(true);
        //程序退出
        System.exit(resultFlag ? 0 :1);
    }




}
